Submitted by: Sidharth Vishnu Bhakth
from scipy.io import loadmat, wavfile
from PIL import Image
import librosa
import numpy as np
import pandas as pd
from scipy.stats import norm, mode
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Audio
import warnings
warnings.filterwarnings("ignore")
# Load speech and noise signals
s, _ = librosa.load('data\\trs.wav', sr=16000)
display(s.shape)
n, _ = librosa.load('data\\trn.wav', sr=16000)
display(n.shape)
# Create noisy signal by adding the speech and noise signals
x = s + n
display(x.shape)
# Play audio files
display(Audio(s, rate=16000))
display(Audio(n, rate=16000))
display(Audio(x, rate=16000))
S = np.abs(librosa.stft(s, n_fft=1024, win_length=1024, hop_length=512, window='hann'))
N = np.abs(librosa.stft(n, n_fft=1024, win_length=1024, hop_length=512, window='hann'))
X = np.abs(librosa.stft(x, n_fft=1024, win_length=1024, hop_length=512, window='hann'))
S.shape, N.shape, X.shape
# Define a Ideal Binary Mask (IBM)
M = 1. * (S > N)
display(M)
display(np.sum(M))
# Define sigmoid activation function
def sigmoid(z):
return 1 / (1 + np.exp(-z))
# Define ReLU activation function
def ReLU(z):
return z * (z > 0)
def ReLU_prime(z):
return 1. * (z > 0)
class NeuralNetwork(object):
def __init__(self, h_layer_size=50, n_epochs=5000, lr=0.0001):
self.h_layer_size = h_layer_size
self.n_epochs = n_epochs
self.lr = lr
def train(self, X, y):
# No of samples
n = X.shape[0]
# Initialize weights and bias
self.weights_h = np.random.randn(n, self.h_layer_size) * np.sqrt(1/(n))
self.bias_h = np.zeros([self.h_layer_size, 1])
self.weights_o = np.random.randn(self.h_layer_size, n) * np.sqrt(1/(self.h_layer_size))
self.bias_o = np.zeros([n, 1])
self.total_loss = list()
for epoch in range(self.n_epochs):
""" Forward propagation """
# Hidden layer
Z_h = np.dot(self.weights_h.T, X) + self.bias_h
activation_h = ReLU(Z_h)
# Outer layer
Z_o = np.dot(self.weights_o.T, activation_h) + self.bias_o
activation_o = sigmoid(Z_o)
# Compute loss function (SSE)
L = (1/2) * np.sum(np.square(activation_o - y))
self.total_loss.append(L)
# Derivative of loss function wrt prediction
dL = activation_o - y
""" Backpropagation """
gradient_o = dL * activation_o * (1 - activation_o)
gradient_h = np.dot(self.weights_o, gradient_o) * ReLU_prime(Z_h)
""" Gradient descent """
# Update weights
self.weights_h -= self.lr * np.dot(X, gradient_h.T)
self.weights_o -= self.lr * np.dot(activation_h, gradient_o.T)
# Update bias
self.bias_h -= np.sum(self.lr * gradient_h)
self.bias_o -= np.sum(self.lr * gradient_o)
if (epoch+1) % 1000 == 0:
print('Epoch: {} || SSE: {}'.format(epoch+1, round(L, 2)))
print('\n')
return self
def predict(self, X_test):
# Predict using learned weights and bias
# Hidden layer
Z_h = np.dot(self.weights_h.T, X_test) + self.bias_h
activation_h = ReLU(Z_h)
# Outer layer
Z_o = np.dot(self.weights_o.T, activation_h) + self.bias_o
self.y_hat = sigmoid(Z_o)
return self.y_hat
For the activation of the hidden layer, I have used ReLU and for the output layer I have used sigmoid. There are 50 units in the hidden layers and the learning rate is 0.0001. I am using 5000 epochs to train the neural network.
# Set seed for reproducibility
np.random.seed(123)
NN = NeuralNetwork()
NN.train(X, M)
loss = NN.total_loss
# Plotting the loss function
plt.plot(loss)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()
# Read test noisy and clean signals and scale
x_test, _ = librosa.load('data/tex.wav', sr=16000)
display(x_test.shape)
s_test, _ = librosa.load('data/tes.wav', sr=16000)
display(s_test.shape)
# Apply STFT to test noisy signal and test ground truth
X_test = librosa.stft(x_test, n_fft=1024, win_length=1024, hop_length=512, window='hann')
# Predict masks of spectra of the test noisy signal
M_test = NN.predict(np.abs(X_test))
# Recover the clean speech spectrogram of the test signal
s_hat = librosa.istft(X_test * M_test, hop_length=512, win_length=1024, window='hann')
# match the length of clean test speech to calculate SNR
s_test = s_test[:len(s_hat)]
Audio(s_hat, rate=16000)
plt.plot(s_test)
plt.show()
plt.plot(s_hat)
plt.show()
# Signal-to-Noise Ratio
num = np.dot(s_test, s_test)
den = np.dot((s_test-s_hat), (s_test-s_hat))
SNR = 10 * np.log10(num/den)
print("Signal-to-Noise ratio of cleaned-up test speech signal is {}".format(round(SNR, 2)))
The SNR varies with different initializations of the neural network weights, so I have set a seed to reproduce the best value obtained so far.
X_L = np.array(Image.open('data/im0.ppm'))
X_R = np.array(Image.open('data/im8.ppm'))
X_L.shape, X_L.shape
fig, ax = plt.subplots(1,2)
fig.set_size_inches(12,8)
ax[0].imshow(X_L), ax[1].imshow(X_R)
plt.show()
# Function to find the closest pixel b/w left and right images using index-distance
def closest_pixel(i, j):
closest = 0
right = X_R[i, j]
min_dist = np.inf
for k in range(40):
left = X_L[i, j+k]
dist = sum(np.abs(left - right))
if dist < min_dist:
min_dist = dist
closest = k
return closest
# Create disparity map for left and right images
D = np.zeros([X_L.shape[0], X_L.shape[1]-40])
for i in range(D.shape[0]):
for j in range(D.shape[1]):
D[i, j] = closest_pixel(i, j)
D.shape
X = D.flatten().reshape(-1, 1).astype('float64')
# Histogram
plt.hist(X, bins=40)
plt.title('Histogram of the disparities')
plt.show()
# Density plot
sns.distplot(X, hist=False, kde=True)
plt.title('Density plot of the disparities')
plt.show()
There seem to be six peaks in the data, so we can use 6 Guassians to cluster the data with 200 iterations of the EM algorithm.
def GMM_clustering(X, n_gaussians=6, n_iters=1500):
# Randomly initialize clusters
df = pd.DataFrame(X, columns=['X'])
df['cluster'] = np.random.choice(np.arange(n_gaussians), len(df))
# Prior probability of each cluster
P = np.array([ len(df[df['cluster'] == k])/len(df) for k in range(n_gaussians) ])
# Means for each cluster
mu = np.array([ np.mean(df[df['cluster'] == k])['X'] for k in range(n_gaussians) ])
# SD of each cluster
sigma = np.array([ np.std(df[df['cluster'] == k])['X'] for k in range(n_gaussians) ])
# Likelihood (probability of data X given cluster)
L = np.zeros([len(X), n_gaussians])
# Posterior probabilities
U = np.zeros([len(X), n_gaussians])
for n in range(n_iters):
mu_prev, sigma_prev = mu.copy(), sigma.copy()
""" E step """
for i in range(n_gaussians):
L[:,i] = norm.pdf(df['X'], mu[i], sigma[i])
L[L==0] = 1e-5
# Prior of data - normalize to find
P_X = np.sum(P * L, axis=1)
# Compute posterior
U = (P * L) / P_X.reshape(-1,1)
""" M step """
P = np.mean(U, axis=0)
for i in range(n_gaussians):
mu[i] = np.dot(U[:,i].T, df['X']) / np.sum(U[:,i])
sigma[i] = np.sqrt( np.sum(U[:,i] * (df['X']-mu[i])**2) / np.sum(U[:,i]) )
if np.allclose(mu_prev, mu, rtol=1e-5, atol=1e-8) or np.allclose(sigma_prev, sigma, rtol=1e-5, atol=1e-8):
break
if (n+1) % 100 == 0:
print("No of iterations: {}".format(n+1))
print("Cluster means: {}".format(mu))
print("Cluster SDs: {}".format(sigma))
print("\n")
df['cluster'] = np.argmax(U, axis=1)
return mu, sigma, df
np.random.seed()
mu, sigma, cluster_df = GMM_clustering(X)
cluster_map = np.array(cluster_df['cluster']).reshape(D.shape)
depth_map = np.array(cluster_df['cluster'])
for i in range(6):
depth_map[depth_map == i] = mu[i]
depth_map = depth_map.reshape(D.shape)
plt.imshow(depth_map, cmap="gray")
plt.title("Depth Map")
plt.show()
There is a lot of noise in the depth map obtained from GMM. We can try and improve upon this using Gibbs sampling to estimate the posterior distribuions.
# Initialize arrays for smoothened cluster and depth maps
prev_cluster_map = np.copy(cluster_map)
smoothened_cluster_map = np.copy(cluster_map)
smoothened_depth_map = np.zeros_like(depth_map)
# Function to compute similarity score of i,j belonging to cluster
def similarity(i, j, cluster):
mean = 5
variance = 0.5
if i < 0 or j < 0 or i > prev_cluster_map.shape[0]-1 or j > prev_cluster_map.shape[1]-1 or prev_cluster_map[i,j] == cluster:
return 1
if prev_cluster_map[i,j] == cluster:
mean = 0
return np.exp(-(mean*mean/(2*variance**2)))
# Function to calculate prior probability based on 8-neighboring nodes of i,j belonging to cluster
def prior(i, j, cluster):
# Define coordinates for neighborhood
neighborhood = [-1,0,1]
# Initialize prior probability
prior = 1
for x in neighborhood:
for y in neighborhood:
prior *= similarity(i+x, j+y, cluster)
return prior
# Compute posterior using Gibbs sampling
n_gaussians=6
depth_map_array = list()
for n in range(15):
for i in range(cluster_map.shape[0]):
for j in range(cluster_map.shape[1]):
current_cluster = cluster_map[i,j]
# Initialize posterior probabilities
posterior = np.zeros(n_gaussians)
for k in range(n_gaussians):
posterior[k] = norm.pdf(depth_map[i,j], mu[k], sigma[k]) * prior(i,j,k)
posterior = posterior/np.sum(posterior)
new_label = np.random.choice(np.arange(0, n_gaussians), p=posterior)
smoothened_depth_map[i,j] = mu[new_label]
smoothened_cluster_map[i,j] = new_label
depth_map_array.append(smoothened_depth_map)
prev_cluster_map = np.array(smoothened_cluster_map)
if (n+1) % 5 == 0:
print("{} iterations completed!".format(n+1))
depth_map_array = np.array(depth_map_array)
# Finding the most frequent value
smoothened_depth_map_final = mode(depth_map_array)[0][0]
# Plot smoothened depth map rom Gibbs sampling
plt.imshow(smoothened_depth_map_final, cmap='gray')
plt.show()
X = loadmat('data/trX.mat')['trX']
y = loadmat('data/trY.mat')['trY']
X.shape, y.shape
class AdaBoostPerceptron(object):
def __init__(self, n_estimators=1500, n_epochs=1000, learning_rate=0.0005):
self.n_estimators = n_estimators
self.n_epochs = n_epochs
self.learning_rate = learning_rate
def train(self, X, y):
# No of features(d) and no of samples(n)
d, n = X.shape
self.f_x = 0
# Weight of samples
self.W_samples = np.ones((1, n))
self.sample_weights = list()
self.W_learner = list()
self.total_error = list()
self.accuracy = list()
self.perceptron_weights = list()
self.perceptron_bias = list()
for m in range(self.n_estimators):
# Initialize weights and bias
self.W = np.random.randn(1, d) * (1/np.sqrt(n))
self.b = np.random.randn()
#self.loss = list()
for epoch in range(self.n_epochs):
y_hat = np.tanh(np.dot(self.W, X) + self.b)
# gradient
gradient = -2 * self.W_samples * (y - y_hat) * (1 - y_hat**2)
""" Gradient descent """
# Update weights and bias
self.W -= self.learning_rate * np.dot(gradient, X.T)
self.b -= self.learning_rate * np.dot(gradient, np.ones_like(y).T)
self.perceptron_weights.append(self.W)
self.perceptron_bias.append(self.b)
# Weight assigned to weak learner
self.beta = (1/2) * np.log(np.sum(self.W_samples*(np.sign(y_hat) == y))/np.sum(self.W_samples*(np.sign(y_hat) != y)))
self.W_learner.append(self.beta)
# Add contribution of learner
self.f_x += self.beta * np.sign(y_hat)
error = np.sum(np.sign(self.f_x) != y)
self.total_error.append(error)
self.sample_weights.append(self.W_samples)
self.W_samples *= np.exp(-self.beta*y*np.sign(y_hat))
self.accuracy.append( round((np.sum(np.sign(self.f_x) == y) / n)*100, 2) )
if (m+1) % 250 == 0:
print("No of learners:", m+1)
print("Accuracy: {}%".format(self.accuracy[-1]))
print('\n')
self.W_learner = np.array(self.W_learner).reshape(-1,1)
self.perceptron_weights = np.array(self.perceptron_weights).reshape(self.n_estimators,d)
self.perceptron_bias = np.array(self.perceptron_bias).reshape(-1,1)
self.sample_weights = np.array(self.sample_weights).reshape(self.n_estimators, n)
return self
# Initalize adaboost perceptron
np.random.seed()
clf = AdaBoostPerceptron()
# Train
clf.train(X, y)
# Error (no of samples incorrectly classified)
total_error = clf.total_error
plt.plot(total_error)
plt.xlabel('Number of learners')
plt.ylabel('Incorrect classifications')
plt.show()
# Accuracy
accuracy = clf.accuracy
plt.plot(accuracy)
plt.xlabel('Number of learners')
plt.ylabel('Classification accuracy')
plt.show()
The number of samples classified incorrectly decreases (and consequently the classification accuracy increases) as the number of learners increases.
print('The classification accuracy obtained on training data with {} weak learners is {}%'.format(clf.n_estimators, accuracy[-1]))
# Weights assigned to each learner
W_learner = clf.W_learner
# Weights assigned to each sample
sample_weights = clf.sample_weights
# Weight of each perceptron
perceptron_weights = clf.perceptron_weights
# Bias of each perceptron
perceptron_bias = clf.perceptron_bias
index_metal = np.where(y == 1)[1]
x_metal = X[0, index_metal]
y_metal = X[1, index_metal]
index_rock = np.where(y == -1)[1]
x_rock = X[0, index_rock]
y_rock = X[1, index_rock]
x_min, x_max = np.min(X[0,:]) - 0.01, np.max(X[0,:]) + 0.01
y_min, y_max = np.min(X[1,:]) - 0.01, np.max(X[1,:]) + 0.01
x0, x1 = np.meshgrid(np.arange(x_min, x_max, 0.005), np.arange(y_min, y_max, 0.005))
XX = np.c_[x0.flatten(), x1.flatten()].T
countours = np.sum(W_learner*(np.tanh(np.dot(perceptron_weights, XX) + perceptron_bias)), axis = 0)
fig, ax = plt.subplots()
cs = ax.contourf(x0, x1, countours.reshape(x0.shape), 15, cmap='coolwarm')
ax.scatter(x_metal, y_metal, marker="o", s=20*sample_weights)
ax.scatter(x_rock, y_rock, marker="x", s=20*sample_weights)
plt.xlabel('loudness')
plt.ylabel('noisiness')
plt.show()
The size of the plotted point represents the weight assigned to the sample after training.
# Load twitter term-frequency matrices
twitter = loadmat('data\\twitter.mat')
X_tr, Y_tr, X_te, Y_te = twitter['Xtr'], twitter['YtrMat'], twitter['Xte'], twitter['YteMat']
X_tr.shape, Y_tr.shape, X_te.shape, Y_te.shape
# Initialize B and Theta for training data using random values
B = np.random.rand(891, 50)
Theta_tr = np.random.rand(50, 773)
# Learn topics and weights from training data using PLSI
for i in range(500):
denom = np.dot(B, Theta_tr)
denom[denom == 0] = 1e-4
# Update topics B
B *= np.dot((X_tr/denom), Theta_tr.T)
B /= np.dot(np.ones((B.shape[0], B.shape[0])), B)
denom = np.dot(B, Theta_tr)
denom[denom == 0] = 1e-4
# Update weights from training data
Theta_tr *= np.dot(B.T, (X_tr/denom))
Theta_tr /= np.dot(np.ones((Theta_tr.shape[0], Theta_tr.shape[0])), Theta_tr)
# Initialize Theta for test data usign random values
Theta_te = np.random.rand(50, 193)
# Learn weights from test data using PLSI
for i in range(500):
denom = np.dot(B, Theta_te)
denom[denom == 0] = 1e-4
# Update weights from training data
Theta_te *= np.dot(B.T, (X_te/denom))
Theta_te /= np.dot(np.ones((Theta_te.shape[0], Theta_te.shape[0])), Theta_te)
def softmax(z):
e_z = np.exp(z - np.max(z))
return e_z / e_z.sum(axis=0)
class SoftmaxPerceptron(object):
def __init__(self, n_epochs=10000, learning_rate=0.0001):
self.n_epochs = n_epochs
self.learning_rate = learning_rate
def train(self, X, y):
# Initialize weights and bias
self.W = np.random.randn(3, X.shape[0]) * 1/np.sqrt(X.shape[0])
self.b = np.random.randn(3,1)
self.training_loss = list()
self.training_accuracy = list()
for epoch in range(self.n_epochs):
y_hat = softmax(np.dot(self.W, X) + self.b)
# Calculate loss
loss = -np.sum(y * np.log(y_hat))
self.training_loss.append(loss)
delta_W = np.dot((y_hat - y), X.T)
delta_b = np.dot((y_hat - y), np.ones((y.shape[1],1)))
""" Gradient descent """
# Update weights and bias
self.W -= self.learning_rate * delta_W
self.b -= self.learning_rate * delta_b
accuracy = np.sum(np.argmax(y_hat,axis=0) == np.argmax(y,axis=0))/y.shape[1]
self.training_accuracy.append(round(accuracy*100, 2))
if (epoch+1) % 1000 == 0:
print('Epoch', epoch+1)
print('Training loss: {}'.format(round(self.training_loss[-1], 4)))
print('Training accuracy: {}%'.format(self.training_accuracy[-1]))
print('\n')
return self
def predict(self, X_test, y_test):
# Predict using learned weights and bias
self.y_pred = softmax(np.dot(self.W, X_test) + self.b)
accuracy = np.sum(np.argmax(self.y_pred,axis=0) == np.argmax(y_test, axis=0))/y_test.shape[1]
print('Classification accuracy on test data: {}%'.format(round(accuracy*100, 2)))
return self
# Initalize Softmax perceptron
np.random.seed()
clf = SoftmaxPerceptron()
# Train
clf.train(Theta_tr, Y_tr)
# Loss curve
plt.plot(clf.training_loss)
plt.show()
# Predict on test data
clf.predict(Theta_te, Y_te)